package com.deep.test

package com.zhen.hudi.spark

import org.apache.hudi.QuickstartUtils.DataGenerator
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}

/**
 * https://www.cnblogs.com/EnzoDin/p/15933687.html
 * @Author FengZhen
 * @Date 2/17/22 9:09 PM
 * @Description
 * Hudi数据湖框架，基于spark计算引擎，对数据进行CRUD操作，使用官方模拟生成出租车出行数据
 * 任务一：模拟数据，插入Hudi表，采用COW模式
 * 任务二：快照方式查询(Snapshot Query)，采用DSL方式
 * 任务三：更新(update)数据
 * 任务四：增量查询数据(Incremental Query)，采用SQL方式
 * 任务五：删除(Delete)数据
 *
 * cow类型表
 * 写入数据时，如果该rowkey已经存在，则复制原字段
 * 查询
 * 快照查询Snapshot[默认]：
 * 默认加载最近一个版本的数据
 * 通过参数 hoodie.datasource.query.type可以进行设置
 * val QUERY_TYPE_OPT_KEY = "hoodie.datasource.query.type"
 * val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
 * val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
 * val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"
 * val DEFAULT_QUERY_TYPE_OPT_VAL: String = QUERY_TYPE_SNAPSHOT_OPT_VAL
 * 增量查询Incremental：
 * 指定一个时间戳，查询时间戳之后的增量数据
 * 如果是incremental增量查询，需要指定时间戳，
 * 当hudi表中数据满足：instant_time > beginTime时，数据将会被夹在读取
 * 此外，可设置某个时间范围：endTime > instant_time > beginTime
 * val BEGIN_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.begin.instanttime"
 * val END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime"
 *
 * 删除数据
 * 需要设置属性参数：hoodie.datasource.write.operation : delete
 * val OPERATION_OPT_KEY = "hoodie.datasource.write.operation"
 * val BULK_INSERT_OPERATION_OPT_VAL = WriteOperationType.BULK_INSERT.value
 * val INSERT_OPERATION_OPT_VAL = WriteOperationType.INSERT.value
 * val UPSERT_OPERATION_OPT_VAL = WriteOperationType.UPSERT.value
 * val DELETE_OPERATION_OPT_VAL = WriteOperationType.DELETE.value
 * val BOOTSTRAP_OPERATION_OPT_VAL = WriteOperationType.BOOTSTRAP.value
 * val INSERT_OVERWRITE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE.value
 * val INSERT_OVERWRITE_TABLE_OPERATION_OPT_VAL = WriteOperationType.INSERT_OVERWRITE_TABLE.value
 * val DEFAULT_OPERATION_OPT_VAL = UPSERT_OPERATION_OPT_VAL
 *
 *
 */
object HudiSparkDemo {


  def main(args: Array[String]): Unit = {

    //创建SparkSession实例对象，设置属性
    val spark: SparkSession = {
      SparkSession.builder()
        .appName(this.getClass.getSimpleName.stripSuffix("$"))
        .master("local[2]")
        //设置序列化方式：Kryo
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
        .getOrCreate()
    }

    //定义变量，表名称、保存路径
    val tableName = "tbl_trips_cow"
    val tablePath = "/hudi-warehouse/tbl_trips_cow"


    //任务一：模拟数据，插入Hudi表，采用COW模式
    //    insertData(spark, tableName, tablePath)

    //任务二：快照方式查询(Snapshot Query)数据，采用DSL方式
    queryData(spark, tablePath)
    //根据时间查询数据
    //    queryDataByTime(spark, tablePath)

    /**
     * 任务三：更新数据update。
     * 第一步：模拟产生数据；
     * 第二步：模拟产生数据，针对第一步数据字段值更新；
     * 第三步：将数据更新到hudi表
     * hudi数据湖框架最大优势就是支持对数据的Upsert操作(插入或更新)
     */
    //    val dataGen : DataGenerator = new DataGenerator()
    //    insertData(spark, tableName, tablePath, dataGen)
    //    updateData(spark, tableName, tablePath, dataGen)

    //任务四：增量查询数据(Incremental Query)，采用SQL方式
    //    incrementalQueryData(spark, tablePath)

    //任务五：删除(Delete)数据
    deleteData(spark, tableName, tablePath)


    //应用结束，关闭资源
    spark.stop()
  }

  /**
   * 官方案例：模拟产生数据，插入Hudi表，采用COW模式
   *
   * @param spark
   * @param table
   * @param path
   */
  def insertData(spark: SparkSession, table: String, path: String): Unit = {
    import spark.implicits._
    //1.模拟乘车数据
    //构建数据生成器，模拟产生业务数据
    import org.apache.hudi.QuickstartUtils._
    val dataGen: DataGenerator = new DataGenerator()
    //模拟生成100条，并转成String类型
    val inserts = convertToStringList(dataGen.generateInserts(100))

    import scala.collection.JavaConverters._
    val insertDF = spark.read.json(
      spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
    )

    insertDF.printSchema()
    insertDF.show(10, false)

    //2.插入数据到Hudi表
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    insertDF.write
      .mode(SaveMode.Append)
      .format("hudi")
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      //hudi表的属性值的设置
      //预合并
      .option(PRECOMBINE_FIELD.key(), "ts")
      //主键
      .option(RECORDKEY_FIELD.key(), "uuid")
      //分区
      .option(PARTITIONPATH_FIELD.key(), "partitionpath")
      //表名
      .option(TBL_NAME.key(), table)
      .save(path)

  }

  /**
   * 采用Snapshot Query快照方式查询表的数据
   *
   * @param spark
   * @param path
   * @return
   */
  def queryData(spark: SparkSession, path: String): Unit = {
    import spark.implicits._
    val tripsDF = spark.read.format("hudi").load(path)
    //    tripsDF.printSchema()
    //    tripsDF.show(10, false)

    //查询费用大于20，小于50的乘车数据
    tripsDF
      .filter($"fare" >= 20 && $"fare" <= 50)
      .select($"driver", $"rider", $"fare", $"begin_lat", $"begin_lon", $"partitionpath", $"_hoodie_commit_time")
      .orderBy($"fare".desc, $"_hoodie_commit_time".desc)
      .show(20, false)
  }

  /**
   * 根据时间点查询数据
   *
   * @param spark
   * @param path
   */
  def queryDataByTime(spark: SparkSession, path: String): Unit = {
    import org.apache.spark.sql.functions._

    //方式一：指定字符串，按照日期时间过滤获取数据
    val df1 = spark.read
      .format("hudi")
      .option("as.of.instant", "20220222214552250")
      .load(path)
      .sort(col("_hoodie_commit_time").desc)

    df1.printSchema()
    df1.show(5, false)

    //方式二：指定字符串，按照日期时间过滤获取数据
    val df2 = spark.read
      .format("hudi")
      .option("as.of.instant", "2022-02-22 21:45:52.250")
      .load(path)
      .sort(col("_hoodie_commit_time").desc)
    df2.printSchema()
    df2.show(5, false)
  }


  /**
   * 官方案例：模拟产生数据，插入Hudi表，采用COW模式
   *
   * @param spark
   * @param table
   * @param path
   * @param dataGen
   */
  def insertData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator): Unit = {
    import spark.implicits._
    //1.模拟乘车数据
    //构建数据生成器，模拟产生业务数据
    import org.apache.hudi.QuickstartUtils._
    //模拟生成100条，并转成String类型
    val inserts = convertToStringList(dataGen.generateInserts(100))

    import scala.collection.JavaConverters._
    val insertDF = spark.read.json(
      spark.sparkContext.parallelize(inserts.asScala, 2).toDS()
    )

    //    insertDF.printSchema()
    //    insertDF.show(10, false)

    //2.插入数据到Hudi表
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    insertDF.write
      .mode(SaveMode.Overwrite)
      .format("hudi")
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      //hudi表的属性值的设置
      //预合并
      .option(PRECOMBINE_FIELD.key(), "ts")
      //主键
      .option(RECORDKEY_FIELD.key(), "uuid")
      //分区
      .option(PARTITIONPATH_FIELD.key(), "partitionpath")
      //表名
      .option(TBL_NAME.key(), table)
      .save(path)

  }


  /**
   * 模拟产生hudi表中更新数据，将其更新到Hudi表中
   *
   * @param spark
   * @param table
   * @param path
   * @param dataGen
   * @return
   */
  def updateData(spark: SparkSession, table: String, path: String, dataGen: DataGenerator): Unit = {

    import spark.implicits._
    //1.模拟乘车数据
    //构建数据生成器，模拟产生业务数据
    import org.apache.hudi.QuickstartUtils._
    //模拟产生100条更新数据，并转成String类型
    val updates = convertToStringList(dataGen.generateUpdates(100))

    import scala.collection.JavaConverters._
    val updateDF = spark.read.json(
      spark.sparkContext.parallelize(updates.asScala, 2).toDS()
    )

    //    updateDF.printSchema()
    //    updateDF.show(10, false)

    //2.插入数据到Hudi表
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    updateDF.write
      .mode(SaveMode.Append)
      .format("hudi")
      .option("hoodie.insert.shuffle.parallelism", "2")
      .option("hoodie.upsert.shuffle.parallelism", "2")
      //hudi表的属性值的设置
      //预合并
      .option(PRECOMBINE_FIELD.key(), "ts")
      //主键
      .option(RECORDKEY_FIELD.key(), "uuid")
      //分区
      .option(PARTITIONPATH_FIELD.key(), "partitionpath")
      //表名
      .option(TBL_NAME.key(), table)
      .save(path)


  }

  /**
   * 采用incremental query增量方式查询数据，需要指定时间戳
   *
   * @param spark
   * @param path
   */
  def incrementalQueryData(spark: SparkSession, path: String): Unit = {

    import spark.implicits._

    val QUERY_TYPE_OPT_KEY = "hoodie.datasource.query.type"
    val QUERY_TYPE_SNAPSHOT_OPT_VAL = "snapshot"
    val QUERY_TYPE_READ_OPTIMIZED_OPT_VAL = "read_optimized"
    val QUERY_TYPE_INCREMENTAL_OPT_VAL = "incremental"

    val BEGIN_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.begin.instanttime"
    val END_INSTANTTIME_OPT_KEY = "hoodie.datasource.read.end.instanttime"

    //1.加载hudi表数据，获取commit time时间，作为增量查询数据阈值
    spark.read
      .format("hudi")
      .load(path)
      .createOrReplaceTempView("view_temp_hudi_trips")
    val commits: Array[String] = spark
      .sql(
        """
          |SELECT
          | DISTINCT(_hoodie_commit_time) AS commitTime
          |FROM
          | view_temp_hudi_trips
          |ORDER BY
          | commitTime DESC
          |""".stripMargin
      )
      .map(row => row.getString(0))
      .take(50)
    val beginTime = commits(commits.length - 1)
    println(s"beginTime = ${beginTime}")

    //2.设置hudi数据commit time时间阈值，进行增量数据查询
    val tripsIncrementalDF = spark.read
      .format("hudi")
      //设置查询数据模式为：incremental，增量读取
      .option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL)
      //设置增量读取数据时开始时间
      .option(BEGIN_INSTANTTIME_OPT_KEY, beginTime)
      .load(path)

    //3.将增量查询数据注册为临时视图，查询费用大于20的数据
    tripsIncrementalDF.createOrReplaceTempView("hudi_trips_incremental")
    spark
      .sql(
        """
          |SELECT
          | `_hoodie_commit_time`, fare, begin_lon, begin_lat, ts
          |FROM
          | hudi_trips_incremental
          |WHERE
          | fare > 20.0
          |""".stripMargin
      )
      .show(10, false)

  }

  /**
   * 删除hudi表数据，依据主键UUID进行删除，如果是分区表，需要指定分区路径
   *
   * @param spark
   * @param table
   * @param path
   */
  def deleteData(spark: SparkSession, table: String, path: String): Unit = {
    import spark.implicits._

    //1.加载hudi表数据，获取条目数
    val tripsDF: DataFrame = spark.read
      .format("hudi")
      .load(path)
    println(s"Raw Count = ${tripsDF.count()}")

    //2.模拟要删除的数据，从hudi表中加载数据，获取几条数据，转换为要删除的数据集合
    val dataFrame = tripsDF.limit(2).select($"uuid", $"partitionpath")
    import org.apache.hudi.QuickstartUtils._

    val dataGenerator = new DataGenerator()
    val deletes = dataGenerator.generateDeletes(dataFrame.collectAsList())

    import scala.collection.JavaConverters._
    val deleteDF = spark.read.json(spark.sparkContext.parallelize(deletes.asScala, 2))

    //3.保存数据到hudi表中，设置操作类型为DELETE
    import org.apache.hudi.DataSourceWriteOptions._
    import org.apache.hudi.config.HoodieWriteConfig._
    deleteDF.write
      .mode(SaveMode.Append)
      .format("hudi")
      .option("hoodie.insert.shuffle.parallelism", 2)
      .option("hoodie.upsert.shuffle.parallelism", 2)
      //设置数据操作类型为delete，默认值为upsert
      .option(OPERATION.key(), "delete")
      .option(PRECOMBINE_FIELD.key(), "ts")
      .option(RECORDKEY_FIELD.key(), "uuid")
      .option(PARTITIONPATH_FIELD.key(), "partitionpath")
      .option(TBL_NAME.key(), table)
      .save(path)

    //4.再次加载hudi表数据，统计条目数，查看是否减少2条数据
    val verifyDF: DataFrame = spark.read
      .format("hudi")
      .load(path)
    println(s"After Delete Raw Count = ${verifyDF.count()}")

  }
}